/*
 * Copyright (c) 2024 Yunshan Networks
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package decoder

import (
	"context"
	"fmt"
	"net/url"
	"strconv"
	"strings"
	"sync/atomic"
	"time"

	"github.com/deepflowio/deepflow/server/ingester/common"
	"github.com/deepflowio/deepflow/server/ingester/flow_tag"
	profile_common "github.com/deepflowio/deepflow/server/ingester/profile/common"
	"github.com/deepflowio/deepflow/server/ingester/profile/dbwriter"
	"github.com/deepflowio/deepflow/server/libs/codec"
	"github.com/deepflowio/deepflow/server/libs/datatype"
	"github.com/deepflowio/deepflow/server/libs/flow-metrics/pb"
	"github.com/deepflowio/deepflow/server/libs/grpc"
	"github.com/deepflowio/deepflow/server/libs/queue"
	"github.com/deepflowio/deepflow/server/libs/receiver"
	"github.com/deepflowio/deepflow/server/libs/stats"
	"github.com/deepflowio/deepflow/server/libs/utils"
	"github.com/google/uuid"
	logging "github.com/op/go-logging"
	"github.com/pyroscope-io/pyroscope/pkg/convert/jfr"
	"github.com/pyroscope-io/pyroscope/pkg/convert/pprof"
	"github.com/pyroscope-io/pyroscope/pkg/ingestion"
	"github.com/pyroscope-io/pyroscope/pkg/storage/metadata"
	"github.com/pyroscope-io/pyroscope/pkg/storage/segment"
)

var log = logging.MustGetLogger("profile.decoder")

const (
	BUFFER_SIZE = 1024

	UNICODE_NULL = "\x00"
)

type Counter struct {
	RawCount           int64 `statsd:"raw-count"`
	JavaProfileCount   int64 `statsd:"java-profile-count"`
	GolangProfileCount int64 `statsd:"golang-profile-count"`
	EBPFProfileCount   int64 `statsd:"ebpf-profile-count"`

	UncompressSize int64 `statsd:"uncompress-size"`
	CompressedSize int64 `statsd:"compressed-size"`

	TotalTime int64 `statsd:"total-time"`
	AvgTime   int64 `statsd:"avg-time"`

	OffCpuSplitCount     int64 `statsd:"off-cpu-split-count"`
	OffCpuSplitIntoCount int64 `statsd:"off-cpu-split-into-count"`
	OffCputNotSplitCount int64 `statsd:"off-cput-not-split-count"`
}

var spyMap = map[string]string{
	"gospy":     "Golang",
	"javaspy":   "Java",
	"pyspy":     "python",
	"rbspy":     "ruby",
	"phpspy":    "PHP",
	"dotnetspy": "dotnet",
	"nodespy":   "Node",
	"eBPF":      "eBPF",
}

var eBPFEventType = []string{
	pb.ProfileEventType_External:     "third-party",
	pb.ProfileEventType_EbpfOnCpu:    "on-cpu",
	pb.ProfileEventType_EbpfOffCpu:   "off-cpu",
	pb.ProfileEventType_EbpfMemAlloc: "mem-alloc",
	pb.ProfileEventType_EbpfMemInUse: "mem-inuse",
}

const (
	// when agent(sender) compress data by `profile.DataCompress` flag, should parse _ZSTD_COMPRESS_FLAG to decompress
	_ZSTD_COMPRESS_FLAG uint8 = 1 // 0x1
	// when profiler(java-sdk) send data to agent with gzip compress, should parse _GZIP_COMPRESS_FLAG to decompress
	_GZIP_COMPRESS_FLAG uint8 = 1 << 1 // 0x2
)

type Decoder struct {
	index               int
	msgType             datatype.MessageType
	platformData        *grpc.PlatformInfoTable
	inQueue             queue.QueueReader
	profileWriter       *dbwriter.ProfileWriter
	appServiceTagWriter *flow_tag.AppServiceTagWriter
	compressionAlgo     string

	offCpuSplittingGranularity int

	orgId, teamId    uint16
	decompressBuffer []byte

	counter *Counter
	utils.Closable
}

func NewDecoder(index int, msgType datatype.MessageType, compressionAlgo string,
	offCpuSplittingGranularity int,
	platformData *grpc.PlatformInfoTable,
	inQueue queue.QueueReader,
	profileWriter *dbwriter.ProfileWriter,
	appServiceTagWriter *flow_tag.AppServiceTagWriter) *Decoder {
	return &Decoder{
		index:                      index,
		msgType:                    msgType,
		platformData:               platformData,
		inQueue:                    inQueue,
		profileWriter:              profileWriter,
		appServiceTagWriter:        appServiceTagWriter,
		compressionAlgo:            compressionAlgo,
		offCpuSplittingGranularity: offCpuSplittingGranularity,
		counter:                    &Counter{},
	}
}

func (d *Decoder) GetCounter() interface{} {
	var counter *Counter
	counter, d.counter = d.counter, &Counter{}
	if counter.RawCount > 0 {
		counter.AvgTime = counter.TotalTime / counter.RawCount
	}
	return counter
}

func (d *Decoder) Run() {
	common.RegisterCountableForIngester("decoder", d, stats.OptionStatTags{
		"thread":   strconv.Itoa(d.index),
		"msg_type": datatype.MESSAGE_TYPE_PROFILE.String()})
	buffer := make([]interface{}, BUFFER_SIZE)
	decoder := &codec.SimpleDecoder{}
	for {
		n := d.inQueue.Gets(buffer)
		start := time.Now()
		for i := 0; i < n; i++ {
			if buffer[i] == nil {
				continue
			}
			atomic.AddInt64(&d.counter.RawCount, 1)
			recvBytes, ok := buffer[i].(*receiver.RecvBuffer)
			if !ok {
				log.Warning("get decode queue data type wrong")
				continue
			}
			decoder.Init(recvBytes.Buffer[recvBytes.Begin:recvBytes.End])
			d.orgId, d.teamId = uint16(recvBytes.OrgID), uint16(recvBytes.TeamID)
			if d.msgType == datatype.MESSAGE_TYPE_PROFILE {
				d.handleProfileData(recvBytes.VtapID, decoder)
			}
			receiver.ReleaseRecvBuffer(recvBytes)
		}
		d.counter.TotalTime += int64(time.Since(start))
	}
}

func (d *Decoder) appServiceTagWrite(p *dbwriter.InProcessProfile) {
	if d.appServiceTagWriter == nil {
		return
	}

	if p.AppService == "" && p.AppInstance == "" {
		return
	}

	d.appServiceTagWriter.Write(p.Time, dbwriter.PROFILE_TABLE, p.AppService, p.AppInstance, p.OrgId, p.TeamID)
}

func (d *Decoder) handleProfileData(vtapID uint16, decoder *codec.SimpleDecoder) {
	for !decoder.IsEnd() {
		profile := &pb.Profile{}
		decoder.ReadPB(profile)
		if decoder.Failed() || profile == nil {
			log.Errorf("profile data decode failed, offset=%d len=%d", decoder.Offset(), len(decoder.Bytes()))
			return
		}

		parser := &Parser{
			vtapID:                      vtapID,
			orgId:                       d.orgId,
			teamId:                      d.teamId,
			inTimestamp:                 time.Now(),
			profileWriterCallback:       d.profileWriter.Write,
			appServiceTagWriterCallback: d.appServiceTagWrite,
			platformData:                d.platformData,
			IP:                          make([]byte, len(profile.Ip)),
			podID:                       profile.PodId,
			compressionAlgo:             d.compressionAlgo,
			observer:                    &observer{},
			offCpuSplittingGranularity:  d.offCpuSplittingGranularity,
			Counter:                     d.counter,
		}
		copy(parser.IP, profile.Ip[:len(profile.Ip)])

		// for jfr/pprof format, no matter compress or not, it requires decompress to parse profile data
		switch profile.Format {
		case "jfr":
			atomic.AddInt64(&d.counter.JavaProfileCount, 1)
			metadata := d.buildMetaData(profile)
			parser.profileName = metadata.Key.AppName()
			compressFlag := _GZIP_COMPRESS_FLAG
			if profile.DataCompressed {
				compressFlag |= _ZSTD_COMPRESS_FLAG
			}
			log.Debugf("decode java profile data, compression: %d, data: %v", compressFlag, profile.Data)
			err := d.sendProfileData(&jfr.RawProfile{
				FormDataContentType: string(profile.ContentType),
				RawData:             d.decompressData(profile.Data, compressFlag),
			}, profile.Format, parser, metadata)

			if err != nil {
				log.Errorf("decode java profile data failed, offset=%d, len=%d, err=%s", decoder.Offset(), len(decoder.Bytes()), err)
				return
			}
		case "pprof":
			atomic.AddInt64(&d.counter.GolangProfileCount, 1)
			metadata := d.buildMetaData(profile)
			parser.profileName = metadata.Key.AppName()
			var compressFlag uint8 = 0
			if profile.DataCompressed {
				compressFlag = _ZSTD_COMPRESS_FLAG
			}
			log.Debugf("decode golang profile data, compression: %d, data: %v", compressFlag, profile.Data)
			err := d.sendProfileData(&pprof.RawProfile{
				FormDataContentType: string(profile.ContentType),
				RawData:             d.decompressData(profile.Data, compressFlag),
			}, profile.Format, parser, metadata)
			if err != nil {
				log.Errorf("decode golang profile data failed, offset=%d, len=%d, err=%s", decoder.Offset(), len(decoder.Bytes()), err)
				return
			}
		case "":
			// 如果 format == "" && contentType 有 "multipart/form-data"，默认当作 pprof 来解析，且 StreamingParser&PoolStreamingParser = true
			// if format == "" && contentType has "multipart/form-data", using pprof parser as default, StreamingParser&PoolStreamingParser = true
			if strings.Contains(string(profile.ContentType), "multipart/form-data") {
				atomic.AddInt64(&d.counter.GolangProfileCount, 1)
				metadata := d.buildMetaData(profile)
				parser.profileName = metadata.Key.AppName()
				var compressFlag uint8 = 0
				if profile.DataCompressed {
					compressFlag = _ZSTD_COMPRESS_FLAG
				}
				log.Debugf("decode golang profile data, compression: %d, data: %v", compressFlag, profile.Data)
				err := d.sendProfileData(&pprof.RawProfile{
					FormDataContentType: string(profile.ContentType),
					RawData:             d.decompressData(profile.Data, compressFlag),
					StreamingParser:     true,
					PoolStreamingParser: true,
				}, profile.Format, parser, metadata)
				if err != nil {
					log.Errorf("decode golang profile data failed, offset=%d, len=%d, err=%s", decoder.Offset(), len(decoder.Bytes()), err)
					return
				}
			} else {
				atomic.AddInt64(&d.counter.EBPFProfileCount, 1)
				profile = d.filleBPFData(profile)
				metadata := d.buildMetaData(profile)
				parser.profileName = metadata.Key.AppName()
				parser.processTracer = &processTracer{value: profile.WideCount, pid: profile.Pid, stime: int64(profile.Stime), eventType: eBPFEventType[profile.EventType]}
				if profile.WideCount == 0 {
					// adapt agent version before v6.6
					parser.processTracer.value = uint64(profile.Count)
				}
				// for ebpf profiling data, directly write, no need to parse
				log.Debugf("decode ebpf profile data, compression: %d, data: %v", profile.DataCompressed, profile.Data)
				err := parser.rawStackToInProcess(
					profile.Data,
					parser.value,
					metadata.StartTime,
					metadata.Units.String(),
					metadata.SpyName,
					metadata.Key.Labels(),
					profile.DataCompressed,
				)
				if err != nil {
					log.Errorf("decode ebpf profile data failed, offset=%d, len=%d, err=%s", decoder.Offset(), len(decoder.Bytes()), err)
					return
				}
			}
		case "speedscope", "tree", "trie", "lines":
			// not implemented
			continue
		}
	}
}

func (d *Decoder) filleBPFData(profile *pb.Profile) *pb.Profile {
	profile.From = uint32(profile.Timestamp / 1e9) // ns to s
	profile.Until = uint32(time.Now().Unix())
	profile.Units = "microseconds" // agent will calculate real time when captured
	profile.AggregationType = string(metadata.SumAggregationType)
	profile.SpyName = "eBPF"
	return profile
}

func (d *Decoder) buildMetaData(profile *pb.Profile) ingestion.Metadata {
	var profileName string
	var err error
	if profile.Name == "" {
		if profile.ProcessName != "" {
			// if profile comes from eBPF, use processName as profileName
			profileName = strings.Trim(profile.ProcessName, UNICODE_NULL)
		} else {
			profileName = fmt.Sprintf("%s-%s", "profile-empty-service", generateUUID())
		}
	} else {
		profileName, err = url.QueryUnescape(profile.Name)
		if err != nil {
			log.Debugf("decode profile.name wrong, got %s, will use as profilename", profile.Name)
			profileName = profile.Name
		}
	}
	labels, err := segment.ParseKey(profileName)
	if err != nil {
		// 如果无法识别应用名称，直接使用 profileName 作为 app_service
		// if recognise application name failed, use profileName as app_service
		labelKey := make(map[string]string, 1)
		labels = segment.NewKey(labelKey)
		labels.Add("__name__", profileName)
	}
	// use app-profile with `from` params
	startTime := time.Unix(int64(profile.From), 0)
	// using ebpf-profile with `timestamp` nanoseconds parse
	if profile.Timestamp > 0 {
		startTime = time.Unix(0, int64(profile.Timestamp))
	}
	return ingestion.Metadata{
		StartTime:       startTime,
		EndTime:         time.Unix(int64(profile.Until), 0),
		SpyName:         profile.SpyName,
		Key:             labels,
		SampleRate:      profile.SampleRate,
		Units:           metadata.Units(profile.Units),
		AggregationType: metadata.AggregationType(profile.AggregationType),
	}
}

func (d *Decoder) decompressData(data []byte, compressFlag uint8) []byte {
	// zstdCompress comes from agent-sender
	if compressFlag&_ZSTD_COMPRESS_FLAG == _ZSTD_COMPRESS_FLAG {
		var err error
		data, err = profile_common.ZstdDecompress(d.decompressBuffer[:0], data)
		if err != nil {
			log.Errorf("decompress profile data failed, decompressType=%d, len=%d, err=%s", _ZSTD_COMPRESS_FLAG, len(data), err)
			return data
		}
	}
	// gzipCompress comes from application-profiler
	if compressFlag&_GZIP_COMPRESS_FLAG == _GZIP_COMPRESS_FLAG {
		var err error
		data, err = profile_common.GzipDecompress(data)
		if err != nil {
			log.Errorf("decompress profile data failed, decompressType=%d, len=%d, err=%s", _GZIP_COMPRESS_FLAG, len(data), err)
			return data
		}
	}
	return data
}

func (d *Decoder) sendProfileData(profile ingestion.RawProfile, format string, parser *Parser, metadata ingestion.Metadata) error {
	input := &ingestion.IngestInput{
		Format:   ingestion.Format(format),
		Profile:  profile,
		Metadata: metadata,
	}
	return input.Profile.Parse(context.TODO(), parser, parser, metadata)
}

// generate uuid with length 8
func generateUUID() string {
	return uuid.New().String()[:8]
}
